Publishing over MQTT Bridge
This section explains how devices can use the MQTT bridge to communicate with Omnicore. For general information about HTTP and MQTT, see Protocols.
Be sure to refer to the API documentation for full details about each method described in this section. See also the MQTT-related samples.
To publish over the MQTT bridge:
Install an MQTT client on your device.
Download an MQTT server certificate onto your device.
Configure the MQTT client to authenticate the device to Omnicore.
Initiate a TLS handshake over hostprefix.mqtt.korewireless.com or a long-term support domain.
Publish telemetry events or set the device state.
MQTT server
Omnicore supports the MQTT protocol by running a managed broker that listens to the port hostprefix.mqtt.korewireless.com:8883. Port 8883 is the standard TCP port reserved with IANA for secure MQTT connections. Connections to this port must use TLS transport, which is supported by open source clients like Eclipse Paho.
The MQTT standard is defined for implementing a full publish/subscribe broker. However, the managed MQTT bridge run by Omnicore does not support all publish/subscribe operations, such as creating arbitrary topics that devices can use to send messages between them. (Filtering can be accomplished with downstream processes running on Cloud Pub/Sub.) Omnicore uses a predefined set of topics and specific topic formats.
Quality of Service (QoS)
The MQTT specification describes three Quality of Service (QoS) levels:
- QoS 0, delivered at most once
- QoS 1, delivered at least once
- QoS 2, delivered exactly once
Omnicore does not support QoS 2. Publishing QoS 2 messages closes the connection. Subscribing to a predefined topic with QoS 2 downgrades the QoS level to QoS 1.
QoS 0 and 1 function as follows in Omnicore:
- A PUBLISH message with QoS 1 will be acknowledged by the PUBACK message after it has been successfully
sent to Mqtt Bridge.
- PUBLISH messages with QoS 0 do not require PUBACK responses, and may be dropped if there is any jitter
along the message delivery path (for example, if Cloud Pub/Sub is temporarily unavailable).
- The Omnicore MQTT bridge maintains a small buffer of undelivered messages in order to retry them.
If the buffer becomes full, the message with QoS 1 may be dropped and a PUBACK message will not be sent to
the client. The client is expected to resend the message.
For device configurations, QoS levels are as follows:
- When QoS is 0, a given configuration version will be published to the device only once.
A QoS of 0 is thus useful when a configuration is frequently updated (on the order of seconds
or minutes) and it's not necessary for the device to receive every update.
- When QoS is 1, the latest configuration update will be retried until the device acknowledges it with a
PUBACK.
This level is the safest mode for device configurations: it guarantees that the device will
eventually get the latest configuration.
Downloading MQTT server certificates
To use TLS transport, devices must verify Omnicore server certificates to ensure they're communicating with Omnicore rather than an impersonator. The following certificate packages support verification:
- The CA certification package (128 KB) for hostprefix.mqtt.korewireless.com is available at https://pki.cloud.korewireless.com/roots.pem
- This package establishes the chain of trust to communicate with Omnicore.
- Devices with the complete root CA certification package communicate directly with the MQTT server.
- This package is regularly updated.
After downloading Omnicore CA certificates to your device, you can configure an MQTT client to authenticate the device, connect to the MQTT server, and communicate over the MQTT bridge.
Publishing telemetry events
After the device is configured with an MQTT client and connected to the MQTT bridge, it can publish a telemetry event by issuing a PUBLISH message to an MQTT topic in the following format:
/REGISTRY_ID/DEVICE_ID/events
The device ID is the string ID of the device specified in the MQTT client ID. The device ID is case sensitive.
Messages published to this MQTT topic are forwarded to the corresponding registry's default telemetry topic. If no default Pub/Sub topic exists, published telemetry data will be lost. To publish messages to other Cloud Pub/Sub topics, see Publishing telemetry events to additional Cloud Pub/Sub topics.
The forwarded message data field contains a copy of the message published by the device, and the following message attributes are added to each message in the Cloud Pub/Sub topic:
Atribute | Description |
---|---|
deviceId | The user-defined string identifier for the device, for example, thing1. The device ID must be unique within the registry. |
deviceNumId | The server-generated numeric ID of the device. When you create a device, Omnicore automatically generates the device numeric ID; it's globally unique and not editable. |
deviceRegistryId | The user-defined string identifier for the device registry, for example, registry1. |
subscriptionId | The string ID of the subscription that owns the registry and device. |
subFolder | The subfolder can be used as an event category or classification. For MQTT clients, the subfolder is the subtopic after /REGISTRY_ID/DEVICE_ID/events, which is copied directly. For example, if the client publishes to the MQTT topic /REGISTRY_ID/DEVICE_ID/events/alerts, the subfolder is the string alerts. |
The following sample shows how to send PUBLISH messages through the MQTT connection:
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"log"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
jwt "github.com/golang-jwt/jwt"
)
var (
deviceID = flag.String("device", "", "Omnicore Device ID")
bridge = struct {
host *string
port *string
}{
flag.String("mqtt_host", "hostprefix.mqtt.korewireless.com", "MQTT Bridge Host"),
flag.String("mqtt_port", "8883", "MQTT Bridge Port"),
}
subscriptionID = flag.String("subscription", "", "Omnicore Subscription ID")
registryID = flag.String("registry", "", "Omnicore Registry ID (short form)")
certsCA = flag.String("ca_certs", "", "Download https://pki.cloud.korewireless.com/roots.pem")
privateKey = flag.String("private_key", "", "Path to private key file")
)
func main() {
log.Println("[main] Entered")
log.Println("[main] Flags")
flag.Parse()
log.Println("[main] Loading Omnicore's roots")
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(*certsCA)
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
log.Println("[main] Creating TLS Config")
config := &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{},
MinVersion: tls.VersionTLS12,
}
clientID := fmt.Sprintf("subscriptions/%v/registries/%v/devices/%v",
*subscriptionID,
*registryID,
*deviceID,
)
log.Println("[main] Creating MQTT Client Options")
opts := MQTT.NewClientOptions()
broker := fmt.Sprintf("ssl://%v:%v", *bridge.host, *bridge.port)
log.Printf("[main] Broker '%v'", broker)
opts.AddBroker(broker)
opts.SetClientID(clientID).SetTLSConfig(config)
opts.SetUsername("unused")
token := jwt.New(jwt.SigningMethodRS256)
token.Claims = jwt.StandardClaims{
IssuedAt: time.Now().Unix(),
ExpiresAt: time.Now().Add(24 * time.Hour).Unix(),
}
log.Println("[main] Load Private Key")
keyBytes, err := ioutil.ReadFile(*privateKey)
if err != nil {
log.Fatal(err)
}
log.Println("[main] Parse Private Key")
key, err := jwt.ParseRSAPrivateKeyFromPEM(keyBytes)
if err != nil {
log.Fatal(err)
}
log.Println("[main] Sign String")
tokenString, err := token.SignedString(key)
if err != nil {
log.Fatal(err)
}
opts.SetPassword(tokenString)
// Incoming
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("[handler] Topic: %v\n", msg.Topic())
fmt.Printf("[handler] Payload: %v\n", msg.Payload())
})
log.Println("[main] MQTT Client Connecting")
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
topic := struct {
config string
telemetry string
loopback string
}{
config: fmt.Sprintf("/%s/%s/config", *registryId,*deviceID),
telemetry: fmt.Sprintf("/%s/%s/events", *registryId,*deviceID),
loopback: fmt.Sprintf("/%s/%s/loopback", *registryId,*deviceID),
}
log.Println("[main] Publishing Messages To Events Telemetry Topic")
for i := 0; i < 10; i++ {
log.Printf("[main] Publishing Message #%d", i)
token := client.Publish(
topic.telemetry,
0,
false,
fmt.Sprintf("Message: %d", i))
token.WaitTimeout(5 * time.Second)
}
log.Println("[main] MQTT Client Disconnecting")
client.Disconnect(250)
log.Println("[main] Done")
Publishing telemetry events to additional Cloud Pub/Sub topics
Devices can publish data to additional Cloud Pub/Sub topics. By default, MQTT messages published to /REGISTRY_ID/DEVICE_ID/events are forwarded to the corresponding registry's default telemetry topic. You can specify a subfolder in the MQTT topic to forward data to additional Cloud Pub/Sub topics. The subfolder is the subtopic after /REGISTRY_ID/DEVICE_ID/events.
Messages published to a subfolder are forwarded to the Cloud Pub/Sub topic with the same name. The corresponding registry must be configured with the Cloud Pub/Sub topic; otherwise, messages are forwarded to the default Cloud Pub/Sub topic.
Messages are forwarded to the default Cloud Pub/Sub topic instead of the additional Cloud Pub/Sub topic in the following cases:
No subfolder is specified in the MQTT topic
A subfolder is specified in the MQTT topic, but it doesn't have a matching Pub/Sub topic in the device registry
For example, if the device publishes to the MQTT topic /REGISTRY_ID/DEVICE_ID/events/alerts, the subfolder is the string alerts. Messages are forwarded to the additional Cloud Pub/Sub topic if the subfolder alerts and a specific Cloud Pub/Sub topic mapped to it is present. Otherwise, messages are forwarded to the default Cloud Pub/Sub topic.
Setting device state
Connected devices can report device state by issuing a PUBLISH message to the following MQTT topic:
/REGISTRY_ID/DEVICE_ID/state
To categorize and retrieve state messages, configure the registry with a device state topic. The device state topic is the Cloud Pub/Sub topic specified in the State Notification field. If the registry is configured with a device state topic, these messages are forwarded to the matching Cloud Pub/Sub topic on a best-effort basis.
Subfolders are not supported for device state messages.
For more details on retrieving state messages, see Getting device state.
Limiting MQTT traffic
Omnicore limits subscriptions that generate excessive load. When devices retry failed operations without waiting, they can trigger limits that affect all devices in the same Omnicore tenant.
For retries, you are strongly encouraged to implement a truncated exponential backoff algorithm with introduced jitter.
Keep-alive
When sending the initial MQTT CONNECT message from a client, you can supply an optional "keep-alive" value. This value is a time interval, measured in seconds, during which the broker expects a client to send a message, such as a PUBLISH message. If no message is sent from the client to the broker during the interval, the broker automatically closes the connection. Note that the keep-alive value you specify is multiplied by 1.5, so setting a 10-minute keep-alive actually results in a 15 minute interval.
For more information, see the MQTT specification.
Client settings
Omnicore does not supply its own default keep-alive value; if you choose to specify a keep-alive interval, you must set it in the client.
For best results, set the client's keep-alive interval to a minimum of 60 seconds. Many open source client libraries, including the Paho MQTT libraries for C, Python, Node.js, and Java, use 60 seconds by default.